-
Notifications
You must be signed in to change notification settings - Fork 1.2k
send old version memoryQueue's stale activation to queueManager when update action #5228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
send old version memoryQueue's stale activation to queueManager when update action #5228
Conversation
When old container doesn't exist
@@ -97,7 +96,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr | |||
if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) { | |||
logging.error( | |||
this, | |||
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation") | |||
s"[$action] [$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it easy to debug in future maybe.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #5228 +/- ##
===========================================
+ Coverage 44.53% 72.98% +28.44%
===========================================
Files 238 238
Lines 13957 13965 +8
Branches 570 576 +6
===========================================
+ Hits 6216 10192 +3976
+ Misses 7741 3773 -3968 ☔ View full report in Codecov by Sentry. |
@@ -561,6 +561,20 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |||
// let QueueManager know this queue is no longer in charge. | |||
context.parent ! staleQueueRemovedMsg | |||
|
|||
if (queue.size > 0) { | |||
// if doesn't exist old container to pull old memoryQueue's activation, complete the activation directly | |||
if (containers.size == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to consider containers being created(in-progress containers).
And it's worth discussing but IIRC, in our downstream, we decided to promote the old version of activations to the latest one rather than just completing them with errors.
If we do this, I think QueueManager should act in the same way for stale activations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding I think we need to consider containers being created(in-progress containers).
I think have no need to consider in-progress containers during action update in this case, because
- Normally, the in-progress containers will be failed due to version not matched: https://github.com/apache/openwhisk/blob/master/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala#L85
- if above #L85 codes happens, has a option to create container using latest version of action, but if we do this, the new version container can't fetch the old version memoryQueue's stale activation as well.
So here, i think just consider if (containers.size == 0)
is enough.
Regarding
And it's worth discussing but IIRC, in our downstream, we decided to promote the old version of activations to the latest one rather than just completing them with errors.
If we do this, I think QueueManager should act in the same way for stale activations.
when this case happens and have no existing old version container, just send the stale activation to queueManager to schedule to new version memoryQueue, and the activations will be fetched by new version container.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so this is to promote the activation revision by sending them to QueueManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
3185af0
to
7715ad5
Compare
probe.expectMsg(Transition(fsm, state, Removing)) | ||
|
||
fsm ! QueueRemovedCompleted | ||
|
||
// queue should not be terminated as there is an activation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why these lines are removed? since there is a container in https://github.com/apache/openwhisk/pull/5228/files#diff-4b5788222d7949baf4e09b570e4fa49d3fa79e5432ff50c798b04353758c8d4aR1539, I think message should not be sent to QueueManager
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, fixed.
@@ -1496,6 +1497,7 @@ class MemoryQueueFlowTests | |||
// queue should not be terminated as there is an activation | |||
Thread.sleep(gracefulShutdownTimeout.toMillis) | |||
|
|||
fsm.underlyingActor.containers = Set(testContainerId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding this line covers case that there is still a container for old queue, but does not cover case that there is no container for old queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated accordingly.
memoryQueue.creationIds.size shouldBe 1 | ||
// the monit actor in memoryQueue may decide to create a container | ||
memoryQueue.creationIds.size should be >= 1 | ||
memoryQueue.creationIds.size should be <= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to this codes: https://github.com/apache/openwhisk/blob/master/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala#L908
Above #L908 code is good due to store the inprogressId to etcd needs some time
, so when containerCreationMessage created, need to add that creationId to creationIds in advance.
This test case may not stable, e.g. due to the creationId(testId1)
doesn't keep consistent with CreationId.generate()
, if the monit actor decide to create a container very quickly, this test case would be failed.
So here, make it stable.
@@ -554,7 +554,7 @@ class QueueManagerTests | |||
mockConsumer, | |||
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds))) | |||
|
|||
queueManager ! activationMessage | |||
queueManager ! activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = Instant.now()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this test case more stable
@@ -829,6 +834,23 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |||
} | |||
} | |||
|
|||
|
|||
private def handleStaleActivationsWhenUpdateAction(queueManager: ActorRef): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def handleStaleActivationsWhenUpdateAction(queueManager: ActorRef): Unit = { | |
private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the method name accordingly.
@@ -561,6 +561,20 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |||
// let QueueManager know this queue is no longer in charge. | |||
context.parent ! staleQueueRemovedMsg | |||
|
|||
if (queue.size > 0) { | |||
// if doesn't exist old container to pull old memoryQueue's activation, complete the activation directly | |||
if (containers.size == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so this is to promote the activation revision by sending them to QueueManager.
I will merge this pr due to |
This is bug, if doesn't fix, the activation will be failed as below
container creation retry
This is enhance point, during retry,
scheduler_inProgressJobRetentionSecond
toscheduler_inProgressJobRetention
Description
Related issue and scope
My changes affect the following components
Types of changes
Checklist: